package com.tanx.cqrs.infrastructure.spring.command;

import com.tanx.cqrs.command.Command;
import com.tanx.cqrs.command.CommandBus;
import com.tanx.cqrs.command.CommandCallback;
import com.tanx.cqrs.command.handler.CommandHandlerResolver;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;

import java.util.concurrent.*;

/**
 * 内存命令总线
 */
@Slf4j
public class MemoryCommandBus implements CommandBus, DisposableBean {

    private ExecutorService executorService = Executors.newCachedThreadPool();
    private CommandHandlerResolver resolver;

    public MemoryCommandBus(@NonNull CommandHandlerResolver resolver) {
        this.resolver = resolver;
    }

    @Override
    public void send(@NonNull Command command, @NonNull CommandCallback<? super Command, Object> commandCallback) {
//        Stream.of(command).parallel().forEach(item -> invoke(item, commandCallback));
        executorService.submit(() -> {
            invoke(command, commandCallback);
        });
    }

    @Override
    public Object sendAndWait(@NonNull Command command) {
        Future<Object> submit = getFuture(command);
        Object result;
        try {
            result = submit.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return result;
    }

    @Override
    public Object sendAndWait(@NonNull Command command, @NonNull long time, @NonNull TimeUnit timeUnit) {
        Future<Object> submit = getFuture(command);
        Object result;
        try {
            result = submit.get(time, timeUnit);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return result;
    }

    @Override
    public CompletableFuture<Object> send(@NonNull Command command) {
        return CompletableFuture.supplyAsync(() -> {
            return invoke(command, new DefaultCommandCallBack());
        }, executorService);
    }

    private Object invoke(@NonNull Command command, @NonNull CommandCallback<? super Command, Object> commandCallback) {
        try {
            Object result = resolver.invoke(command);
            commandCallback.onSuccess(command, result);
            return result;
        } catch (Exception e) {
            commandCallback.onFailure(command, e);
        }
        return null;
    }

    private Future<Object> getFuture(@NonNull Command command) {
        return executorService.submit(() -> {
            return invoke(command, new DefaultCommandCallBack());
        });
    }

    @Override
    public void destroy() throws Exception {
        executorService.shutdown();
    }

    private class DefaultCommandCallBack implements CommandCallback<Command, Object> {

        @Override
        public void onSuccess(Command command, Object result) {
            log.info("execute command {} result {}", command, result);
        }

        @Override
        public void onFailure(Command command, Throwable throwable) {
            log.error("execute command {} throwable {}", command, throwable);
        }
    }
}
